{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Introducing MLib package of PySpark "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Load and transform the data"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Just like in the previous chapter, we first specify the schema of our dataset."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "import pyspark.sql.types as typ\n",
    "\n",
    "labels = [\n",
    "    ('INFANT_ALIVE_AT_REPORT', typ.StringType()),\n",
    "    ('BIRTH_YEAR', typ.IntegerType()),\n",
    "    ('BIRTH_MONTH', typ.IntegerType()),\n",
    "    ('BIRTH_PLACE', typ.StringType()),\n",
    "    ('MOTHER_AGE_YEARS', typ.IntegerType()),\n",
    "    ('MOTHER_RACE_6CODE', typ.StringType()),\n",
    "    ('MOTHER_EDUCATION', typ.StringType()),\n",
    "    ('FATHER_COMBINED_AGE', typ.IntegerType()),\n",
    "    ('FATHER_EDUCATION', typ.StringType()),\n",
    "    ('MONTH_PRECARE_RECODE', typ.StringType()),\n",
    "    ('CIG_BEFORE', typ.IntegerType()),\n",
    "    ('CIG_1_TRI', typ.IntegerType()),\n",
    "    ('CIG_2_TRI', typ.IntegerType()),\n",
    "    ('CIG_3_TRI', typ.IntegerType()),\n",
    "    ('MOTHER_HEIGHT_IN', typ.IntegerType()),\n",
    "    ('MOTHER_BMI_RECODE', typ.IntegerType()),\n",
    "    ('MOTHER_PRE_WEIGHT', typ.IntegerType()),\n",
    "    ('MOTHER_DELIVERY_WEIGHT', typ.IntegerType()),\n",
    "    ('MOTHER_WEIGHT_GAIN', typ.IntegerType()),\n",
    "    ('DIABETES_PRE', typ.StringType()),\n",
    "    ('DIABETES_GEST', typ.StringType()),\n",
    "    ('HYP_TENS_PRE', typ.StringType()),\n",
    "    ('HYP_TENS_GEST', typ.StringType()),\n",
    "    ('PREV_BIRTH_PRETERM', typ.StringType()),\n",
    "    ('NO_RISK', typ.StringType()),\n",
    "    ('NO_INFECTIONS_REPORTED', typ.StringType()),\n",
    "    ('LABOR_IND', typ.StringType()),\n",
    "    ('LABOR_AUGM', typ.StringType()),\n",
    "    ('STEROIDS', typ.StringType()),\n",
    "    ('ANTIBIOTICS', typ.StringType()),\n",
    "    ('ANESTHESIA', typ.StringType()),\n",
    "    ('DELIV_METHOD_RECODE_COMB', typ.StringType()),\n",
    "    ('ATTENDANT_BIRTH', typ.StringType()),\n",
    "    ('APGAR_5', typ.IntegerType()),\n",
    "    ('APGAR_5_RECODE', typ.StringType()),\n",
    "    ('APGAR_10', typ.IntegerType()),\n",
    "    ('APGAR_10_RECODE', typ.StringType()),\n",
    "    ('INFANT_SEX', typ.StringType()),\n",
    "    ('OBSTETRIC_GESTATION_WEEKS', typ.IntegerType()),\n",
    "    ('INFANT_WEIGHT_GRAMS', typ.IntegerType()),\n",
    "    ('INFANT_ASSIST_VENTI', typ.StringType()),\n",
    "    ('INFANT_ASSIST_VENTI_6HRS', typ.StringType()),\n",
    "    ('INFANT_NICU_ADMISSION', typ.StringType()),\n",
    "    ('INFANT_SURFACANT', typ.StringType()),\n",
    "    ('INFANT_ANTIBIOTICS', typ.StringType()),\n",
    "    ('INFANT_SEIZURES', typ.StringType()),\n",
    "    ('INFANT_NO_ABNORMALITIES', typ.StringType()),\n",
    "    ('INFANT_ANCEPHALY', typ.StringType()),\n",
    "    ('INFANT_MENINGOMYELOCELE', typ.StringType()),\n",
    "    ('INFANT_LIMB_REDUCTION', typ.StringType()),\n",
    "    ('INFANT_DOWN_SYNDROME', typ.StringType()),\n",
    "    ('INFANT_SUSPECTED_CHROMOSOMAL_DISORDER', typ.StringType()),\n",
    "    ('INFANT_NO_CONGENITAL_ANOMALIES_CHECKED', typ.StringType()),\n",
    "    ('INFANT_BREASTFED', typ.StringType())\n",
    "]\n",
    "\n",
    "schema = typ.StructType([\n",
    "        typ.StructField(e[0], e[1], False) for e in labels\n",
    "    ])"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Next, we load the data."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "births = spark.read.csv('births_train.csv.gz', \n",
    "                        header=True, \n",
    "                        schema=schema)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "collapsed": false
   },
   "source": [
    "Specify our recode dictionary."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "recode_dictionary = {\n",
    "    'YNU': {\n",
    "        'Y': 1,\n",
    "        'N': 0,\n",
    "        'U': 0\n",
    "    }\n",
    "}"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Our goal is to predict whether the `'INFANT_ALIVE_AT_REPORT'` is either 1 or 0. Thus, we will drop all of the features that relate to the infant."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "selected_features = [\n",
    "    'INFANT_ALIVE_AT_REPORT', \n",
    "    'BIRTH_PLACE', \n",
    "    'MOTHER_AGE_YEARS', \n",
    "    'FATHER_COMBINED_AGE', \n",
    "    'CIG_BEFORE', \n",
    "    'CIG_1_TRI', \n",
    "    'CIG_2_TRI', \n",
    "    'CIG_3_TRI', \n",
    "    'MOTHER_HEIGHT_IN', \n",
    "    'MOTHER_PRE_WEIGHT', \n",
    "    'MOTHER_DELIVERY_WEIGHT', \n",
    "    'MOTHER_WEIGHT_GAIN', \n",
    "    'DIABETES_PRE', \n",
    "    'DIABETES_GEST', \n",
    "    'HYP_TENS_PRE', \n",
    "    'HYP_TENS_GEST', \n",
    "    'PREV_BIRTH_PRETERM'\n",
    "]\n",
    "\n",
    "births_trimmed = births.select(selected_features)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Specify the recoding methods."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "import pyspark.sql.functions as func\n",
    "\n",
    "def recode(col, key):        \n",
    "    return recode_dictionary[key][col] \n",
    "\n",
    "def correct_cig(feat):\n",
    "    return func \\\n",
    "        .when(func.col(feat) != 99, func.col(feat))\\\n",
    "        .otherwise(0)\n",
    "\n",
    "rec_integer = func.udf(recode, typ.IntegerType())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Correct the features related to the number of smoked cigarettes."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "births_transformed = births_trimmed \\\n",
    "    .withColumn('CIG_BEFORE', correct_cig('CIG_BEFORE'))\\\n",
    "    .withColumn('CIG_1_TRI', correct_cig('CIG_1_TRI'))\\\n",
    "    .withColumn('CIG_2_TRI', correct_cig('CIG_2_TRI'))\\\n",
    "    .withColumn('CIG_3_TRI', correct_cig('CIG_3_TRI'))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Figure out which Yes/No/Unknown features are."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "cols = [(col.name, col.dataType) for col in births_trimmed.schema]\n",
    "\n",
    "YNU_cols = []\n",
    "\n",
    "for i, s in enumerate(cols):\n",
    "    if s[1] == typ.StringType():\n",
    "        dis = births.select(s[0]) \\\n",
    "            .distinct() \\\n",
    "            .rdd \\\n",
    "            .map(lambda row: row[0]) \\\n",
    "            .collect()\n",
    "\n",
    "        if 'Y' in dis:\n",
    "            YNU_cols.append(s[0])"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "DataFrames can transform the features *in bulk* while selecting features."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[Row(INFANT_NICU_ADMISSION='Y', INFANT_NICU_ADMISSION_RECODE=1),\n",
       " Row(INFANT_NICU_ADMISSION='Y', INFANT_NICU_ADMISSION_RECODE=1),\n",
       " Row(INFANT_NICU_ADMISSION='U', INFANT_NICU_ADMISSION_RECODE=0),\n",
       " Row(INFANT_NICU_ADMISSION='N', INFANT_NICU_ADMISSION_RECODE=0),\n",
       " Row(INFANT_NICU_ADMISSION='U', INFANT_NICU_ADMISSION_RECODE=0)]"
      ]
     },
     "execution_count": 8,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "births.select([\n",
    "        'INFANT_NICU_ADMISSION', \n",
    "        rec_integer(\n",
    "            'INFANT_NICU_ADMISSION', func.lit('YNU')\n",
    "        ) \\\n",
    "        .alias('INFANT_NICU_ADMISSION_RECODE')]\n",
    "     ).take(5)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Transform all the `YNU_cols` in one using a list of transformations."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "exprs_YNU = [\n",
    "    rec_integer(x, func.lit('YNU')).alias(x) \n",
    "    if x in YNU_cols \n",
    "    else x \n",
    "    for x in births_transformed.columns\n",
    "]\n",
    "\n",
    "births_transformed = births_transformed.select(exprs_YNU)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Let's check if we got it correctly."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+------------+-------------+------------+-------------+------------------+\n",
      "|DIABETES_PRE|DIABETES_GEST|HYP_TENS_PRE|HYP_TENS_GEST|PREV_BIRTH_PRETERM|\n",
      "+------------+-------------+------------+-------------+------------------+\n",
      "|           0|            0|           0|            0|                 0|\n",
      "|           0|            0|           0|            0|                 0|\n",
      "|           0|            0|           0|            0|                 0|\n",
      "|           0|            0|           0|            0|                 1|\n",
      "|           0|            0|           0|            0|                 0|\n",
      "+------------+-------------+------------+-------------+------------------+\n",
      "only showing top 5 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "births_transformed.select(YNU_cols[-5:]).show(5)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Get to know your data"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Descriptive statistics"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "collapsed": true
   },
   "source": [
    "We will use the `colStats(...)` method."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "MOTHER_AGE_YEARS: \t28.30 \t 6.08\n",
      "FATHER_COMBINED_AGE: \t44.55 \t 27.55\n",
      "CIG_BEFORE: \t1.43 \t 5.18\n",
      "CIG_1_TRI: \t0.91 \t 3.83\n",
      "CIG_2_TRI: \t0.70 \t 3.31\n",
      "CIG_3_TRI: \t0.58 \t 3.11\n",
      "MOTHER_HEIGHT_IN: \t65.12 \t 6.45\n",
      "MOTHER_PRE_WEIGHT: \t214.50 \t 210.21\n",
      "MOTHER_DELIVERY_WEIGHT: \t223.63 \t 180.01\n",
      "MOTHER_WEIGHT_GAIN: \t30.74 \t 26.23\n"
     ]
    }
   ],
   "source": [
    "import pyspark.mllib.stat as st\n",
    "import numpy as np\n",
    "\n",
    "numeric_cols = ['MOTHER_AGE_YEARS','FATHER_COMBINED_AGE',\n",
    "                'CIG_BEFORE','CIG_1_TRI','CIG_2_TRI','CIG_3_TRI',\n",
    "                'MOTHER_HEIGHT_IN','MOTHER_PRE_WEIGHT',\n",
    "                'MOTHER_DELIVERY_WEIGHT','MOTHER_WEIGHT_GAIN'\n",
    "               ]\n",
    "\n",
    "numeric_rdd = births_transformed\\\n",
    "                       .select(numeric_cols)\\\n",
    "                       .rdd \\\n",
    "                       .map(lambda row: [e for e in row])\n",
    "\n",
    "mllib_stats = st.Statistics.colStats(numeric_rdd)\n",
    "\n",
    "for col, m, v in zip(numeric_cols, \n",
    "                     mllib_stats.mean(), \n",
    "                     mllib_stats.variance()):\n",
    "    print('{0}: \\t{1:.2f} \\t {2:.2f}'.format(col, m, np.sqrt(v)))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "For the categorical variables we will calculate the frequencies of their values."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "INFANT_ALIVE_AT_REPORT [(1, 23349), (0, 22080)]\n",
      "BIRTH_PLACE [('1', 44558), ('4', 327), ('3', 224), ('2', 136), ('7', 91), ('5', 74), ('6', 11), ('9', 8)]\n",
      "DIABETES_PRE [(0, 44881), (1, 548)]\n",
      "DIABETES_GEST [(0, 43451), (1, 1978)]\n",
      "HYP_TENS_PRE [(0, 44348), (1, 1081)]\n",
      "HYP_TENS_GEST [(0, 43302), (1, 2127)]\n",
      "PREV_BIRTH_PRETERM [(0, 43088), (1, 2341)]\n"
     ]
    }
   ],
   "source": [
    "categorical_cols = [e for e in births_transformed.columns \n",
    "                    if e not in numeric_cols]\n",
    "\n",
    "categorical_rdd = births_transformed\\\n",
    "                       .select(categorical_cols)\\\n",
    "                       .rdd \\\n",
    "                       .map(lambda row: [e for e in row])\n",
    "            \n",
    "for i, col in enumerate(categorical_cols):\n",
    "    agg = categorical_rdd \\\n",
    "        .groupBy(lambda row: row[i]) \\\n",
    "        .map(lambda row: (row[0], len(row[1])))\n",
    "        \n",
    "    print(col, sorted(agg.collect(), \n",
    "                      key=lambda el: el[1], \n",
    "                      reverse=True))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Correlations"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Correlations between our features."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 19,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "CIG_BEFORE-to-CIG_1_TRI: 0.83\n",
      "CIG_BEFORE-to-CIG_2_TRI: 0.72\n",
      "CIG_BEFORE-to-CIG_3_TRI: 0.62\n",
      "CIG_1_TRI-to-CIG_BEFORE: 0.83\n",
      "CIG_1_TRI-to-CIG_2_TRI: 0.87\n",
      "CIG_1_TRI-to-CIG_3_TRI: 0.76\n",
      "CIG_2_TRI-to-CIG_BEFORE: 0.72\n",
      "CIG_2_TRI-to-CIG_1_TRI: 0.87\n",
      "CIG_2_TRI-to-CIG_3_TRI: 0.89\n",
      "CIG_3_TRI-to-CIG_BEFORE: 0.62\n",
      "CIG_3_TRI-to-CIG_1_TRI: 0.76\n",
      "CIG_3_TRI-to-CIG_2_TRI: 0.89\n",
      "MOTHER_PRE_WEIGHT-to-MOTHER_DELIVERY_WEIGHT: 0.54\n",
      "MOTHER_PRE_WEIGHT-to-MOTHER_WEIGHT_GAIN: 0.65\n",
      "MOTHER_DELIVERY_WEIGHT-to-MOTHER_PRE_WEIGHT: 0.54\n",
      "MOTHER_DELIVERY_WEIGHT-to-MOTHER_WEIGHT_GAIN: 0.60\n",
      "MOTHER_WEIGHT_GAIN-to-MOTHER_PRE_WEIGHT: 0.65\n",
      "MOTHER_WEIGHT_GAIN-to-MOTHER_DELIVERY_WEIGHT: 0.60\n"
     ]
    }
   ],
   "source": [
    "corrs = st.Statistics.corr(numeric_rdd)\n",
    "\n",
    "for i, el in enumerate(corrs > 0.5):\n",
    "    correlated = [\n",
    "        (numeric_cols[j], corrs[i][j]) \n",
    "        for j, e in enumerate(el) \n",
    "        if e == 1.0 and j != i]\n",
    "    \n",
    "    if len(correlated) > 0:\n",
    "        for e in correlated:\n",
    "            print('{0}-to-{1}: {2:.2f}' \\\n",
    "                  .format(numeric_cols[i], e[0], e[1]))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "collapsed": true
   },
   "source": [
    "We can drop most of highly correlated features. "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 20,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "features_to_keep = [\n",
    "    'INFANT_ALIVE_AT_REPORT', \n",
    "    'BIRTH_PLACE', \n",
    "    'MOTHER_AGE_YEARS', \n",
    "    'FATHER_COMBINED_AGE', \n",
    "    'CIG_1_TRI', \n",
    "    'MOTHER_HEIGHT_IN', \n",
    "    'MOTHER_PRE_WEIGHT', \n",
    "    'DIABETES_PRE', \n",
    "    'DIABETES_GEST', \n",
    "    'HYP_TENS_PRE', \n",
    "    'HYP_TENS_GEST', \n",
    "    'PREV_BIRTH_PRETERM'\n",
    "]\n",
    "births_transformed = births_transformed.select([e for e in features_to_keep])"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Statistical testing"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "collapsed": false
   },
   "source": [
    "Run a Chi-square test to determine if there are significant differences for categorical variables."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 21,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "BIRTH_PLACE 0.0\n",
      "DIABETES_PRE 0.0\n",
      "DIABETES_GEST 0.0\n",
      "HYP_TENS_PRE 0.0\n",
      "HYP_TENS_GEST 0.0\n",
      "PREV_BIRTH_PRETERM 0.0\n"
     ]
    }
   ],
   "source": [
    "import pyspark.mllib.linalg as ln\n",
    "\n",
    "for cat in categorical_cols[1:]:\n",
    "    agg = births_transformed \\\n",
    "        .groupby('INFANT_ALIVE_AT_REPORT') \\\n",
    "        .pivot(cat) \\\n",
    "        .count()    \n",
    "\n",
    "    agg_rdd = agg \\\n",
    "        .rdd\\\n",
    "        .map(lambda row: (row[1:])) \\\n",
    "        .flatMap(lambda row: \n",
    "                 [0 if e == None else e for e in row]) \\\n",
    "        .collect()\n",
    "\n",
    "    row_length = len(agg.collect()[0]) - 1\n",
    "    agg = ln.Matrices.dense(row_length, 2, agg_rdd)\n",
    "    \n",
    "    test = st.Statistics.chiSqTest(agg)\n",
    "    print(cat, round(test.pValue, 4))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Create the final dataset"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Create an RDD of `LabeledPoint`s\n",
    "\n",
    "We will use a hashing trick to encode the `'BIRTH_PLACE'` feature."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 23,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "import pyspark.mllib.feature as ft\n",
    "import pyspark.mllib.regression as reg\n",
    "\n",
    "hashing = ft.HashingTF(7)\n",
    "\n",
    "births_hashed = births_transformed \\\n",
    "    .rdd \\\n",
    "    .map(lambda row: [\n",
    "            list(hashing.transform(row[1]).toArray()) \n",
    "                if col == 'BIRTH_PLACE' \n",
    "                else row[i] \n",
    "            for i, col \n",
    "            in enumerate(features_to_keep)]) \\\n",
    "    .map(lambda row: [[e] if type(e) == int else e \n",
    "                      for e in row]) \\\n",
    "    .map(lambda row: [item for sublist in row \n",
    "                      for item in sublist]) \\\n",
    "    .map(lambda row: reg.LabeledPoint(\n",
    "            row[0], \n",
    "            ln.Vectors.dense(row[1:]))\n",
    "        )"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Split into training and testing"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "collapsed": true
   },
   "source": [
    "Before we move to the modeling stage, we need to split our dataset into two sets: one we'll use for training and another one for testing."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 24,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "births_train, births_test = births_hashed.randomSplit([0.6, 0.4])"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Predicting infant survival"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "collapsed": true
   },
   "source": [
    "### Logistic regression in Spark\n",
    "\n",
    "MLLib used to provide a logistic regression model estimated using a stochastic gradient descent (SGD) algorithm. This model has been deprecated in Spark 2.0 in favor of the `LogisticRegressionWithLBFGS` model. "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 27,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "from pyspark.mllib.classification \\\n",
    "    import LogisticRegressionWithLBFGS\n",
    "\n",
    "LR_Model = LogisticRegressionWithLBFGS \\\n",
    "    .train(births_train, iterations=10)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Let's now use the model to predict the classes for our testing set."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 28,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "LR_results = (\n",
    "        births_test.map(lambda row: row.label) \\\n",
    "        .zip(LR_Model \\\n",
    "             .predict(births_test\\\n",
    "                      .map(lambda row: row.features)))\n",
    "    ).map(lambda row: (row[0], row[1] * 1.0))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Let's check how well or how bad our model performed."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 30,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Area under PR: 0.85\n",
      "Area under ROC: 0.63\n"
     ]
    }
   ],
   "source": [
    "import pyspark.mllib.evaluation as ev\n",
    "LR_evaluation = ev.BinaryClassificationMetrics(LR_results)\n",
    "\n",
    "print('Area under PR: {0:.2f}' \\\n",
    "      .format(LR_evaluation.areaUnderPR))\n",
    "print('Area under ROC: {0:.2f}' \\\n",
    "      .format(LR_evaluation.areaUnderROC))\n",
    "LR_evaluation.unpersist()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Selecting only the most predictable features"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "collapsed": true
   },
   "source": [
    "MLLib allows us to select the most predictable features using a Chi-Square selector."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 31,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "selector = ft.ChiSqSelector(4).fit(births_train)\n",
    "\n",
    "topFeatures_train = (\n",
    "        births_train.map(lambda row: row.label) \\\n",
    "        .zip(selector \\\n",
    "             .transform(births_train \\\n",
    "                        .map(lambda row: row.features)))\n",
    "    ).map(lambda row: reg.LabeledPoint(row[0], row[1]))\n",
    "\n",
    "topFeatures_test = (\n",
    "        births_test.map(lambda row: row.label) \\\n",
    "        .zip(selector \\\n",
    "             .transform(births_test \\\n",
    "                        .map(lambda row: row.features)))\n",
    "    ).map(lambda row: reg.LabeledPoint(row[0], row[1]))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "collapsed": true
   },
   "source": [
    "### Random Forest in Spark\n",
    "\n",
    "We are now ready to build the random forest model. "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 38,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "from pyspark.mllib.tree import RandomForest\n",
    "\n",
    "RF_model = RandomForest \\\n",
    "    .trainClassifier(data=topFeatures_train, \n",
    "                     numClasses=2, \n",
    "                     categoricalFeaturesInfo={}, \n",
    "                     numTrees=6,  \n",
    "                     featureSubsetStrategy='all',\n",
    "                     seed=666)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Let's see how well our model did."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 39,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Area under PR: 0.86\n",
      "Area under ROC: 0.63\n"
     ]
    }
   ],
   "source": [
    "RF_results = (\n",
    "        topFeatures_test.map(lambda row: row.label) \\\n",
    "        .zip(RF_model \\\n",
    "             .predict(topFeatures_test \\\n",
    "                      .map(lambda row: row.features)))\n",
    "    )\n",
    "\n",
    "RF_evaluation = ev.BinaryClassificationMetrics(RF_results)\n",
    "\n",
    "print('Area under PR: {0:.2f}' \\\n",
    "      .format(RF_evaluation.areaUnderPR))\n",
    "print('Area under ROC: {0:.2f}' \\\n",
    "      .format(RF_evaluation.areaUnderROC))\n",
    "RF_evaluation.unpersist()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Let's see how the logistic regression would perform with reduced number of features."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 35,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Area under PR: 0.85\n",
      "Area under ROC: 0.63\n"
     ]
    }
   ],
   "source": [
    "LR_Model_2 = LogisticRegressionWithLBFGS \\\n",
    "    .train(topFeatures_train, iterations=10)\n",
    "\n",
    "LR_results_2 = (\n",
    "        topFeatures_test.map(lambda row: row.label) \\\n",
    "        .zip(LR_Model_2 \\\n",
    "             .predict(topFeatures_test \\\n",
    "                      .map(lambda row: row.features)))\n",
    "    ).map(lambda row: (row[0], row[1] * 1.0))\n",
    "\n",
    "LR_evaluation_2 = ev.BinaryClassificationMetrics(LR_results_2)\n",
    "\n",
    "print('Area under PR: {0:.2f}' \\\n",
    "      .format(LR_evaluation_2.areaUnderPR))\n",
    "print('Area under ROC: {0:.2f}' \\\n",
    "      .format(LR_evaluation_2.areaUnderROC))\n",
    "LR_evaluation_2.unpersist()"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.5.2"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 0
}
